Fork me on GitHub

Kafka无消息丢失配置

1. 背景

在实际开发过程中遇到了客户购买物联网平台能力的需求,因为客户购买操作在增值服务平台,能力开通在我们团队的应用中,常规的方案是在购买成功后发送 Kafka 消息,由我们团队订阅消息,完成后续的操作。

由于涉及到用户付款操作,所以需要保证消息不丢失。

2. 哪些步骤可能会丢消息

2.1 生产者丢消息

Kafka Producer 其实是异步发送消息的,如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。

该方法只是将消息写入 Buffer Pool 中,会由后台线程批量将消息发送给 Broker,如果消息还未发送,此时宕机,Buffer Pool 中的数据会丢失;如果在后台线程发送给 Broker 的过程中失败了,业务中同样无法精确的感知到。

推荐使用带回调函数的 send 方法,一旦出现消息提交失败的情况,我们可以有针对性地进行处理。

部分示例代码:

1
2
3
4
5
6
7
8
kafkaProducer.send(new ProducerRecord(TOPIC, i + "", i + ""), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (null != exception) {
// 这里区分exception,评估是否需要重试,引入监控和告警
}
}
});

如果回调函数中异常不为空,需要根据异常的类型进行相应的处理,如果是消息体有问题(格式不对或者大小越界),证明 Producer 的代码有问题,消息需要重新定义;如果是 Broker 网络问题,可以进行重试。

针对 exception 的处理可以通过日志或者 trace 进行记录,方便监控和告警。

2.2 Broker丢消息

对于 Broker,一句话概括,Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。

对这句话做以下解释:

  1. 已提交消息:当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。
  2. 若干个 Broker:这个取决于 Producer 和 Broker 的相关配置,其中,Producer 可以通过 acks 的配置指定了是否 Broker 是否写日志文件以及同步数据到 Follower;Broker 可以通过 replication.factor 的配置指定副本个数,通过 min.insync.replicas 的配置指定需要同步多少副本才算是已提交消息。

当然,如果出现 Kafka 集群中的所有 Broker 同时全部宕机这种极端情况,消息还是有丢失的风险。所以,Kafka 只做有限度的持久化保证。

2.3 消费者丢消息

消费者丢消息主要是自动 ack 。

3. 生产环境无消息丢失配置

配置涉及到 Producer、Broker 以及 Consumer。

3.1 Producer

  • 不要使用 producer.send(msg),推荐使用 producer.send(msg, callback)
  • 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”
  • 设置 retries 为一个较大的值

3.2 Broker 端

在配置之前,需要和负责 Kafka 的中间件同学沟通,拿到目前 Broker 的相关参数配置,如果不符合你的需求,看是否能做相应的调整。

  • 设置 unclean.leader.election.enable = false,不允许非 ISR 中的副本被选举为 Leader,如果一个 Broker 数据落后 Leader 太多,那么一旦它成为新的 Leader,必然会造成数据丢失
  • 设置 replication.factor >= 3,该参数决定副本数量,最好将消息多保存几份,增加冗余
  • 设置 min.insync.replicas > 1,该参数定义消息至少要被写入到多少个副本才算是“已提交”,推荐 replication.factor = min.insync.replicas + 1

我咨询了我司中间件同学,replication.factor 设置值为 3,min.insync.replicas 设置值为 2,unclean.leader.election.enable 设置值为 false,因此是满足无消息丢失的配置的。

3.3. Consumer

  • 设置 enable.auto.commit=false,手动 ack

3.4 小提醒

以我这些年和中间件打交道的经验来看,除非是 Kafka 服务端的配置十分不合理,否则要想说服中间件的同学改 Broker 的配置是十分困难的。

因此,如果没办法保证整个 Kafka 的配置都满足无消息丢失的完美配置,需要针对某些可能丢消息的场景做兜底方案,比如系统对账甚至人工介入等。

Kafka 客户端详细的配置可参见之前的文章——Kafka调优与详细参数说明

4. 参考

《Apache Kafka 实战》

本文标题:Kafka无消息丢失配置

原始链接:https://zhaoxiaofa.com/2021/09/23/Kafka 无消息丢失配置/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。